查看原文
其他

万字Apache Druid 入门与实践总结!

The following article is from 云祁QI Author 云祁


前言

大家好!

之前对一款高效的 OLAP 引擎——Apache Druid 做了技术调研,从基础入门知识到 Demo 实践也算是循序渐进了。

今天稍作整理分享给大家,三万多字,有点长,建议收藏再看。

毕竟个人的精力总归是有限的,如果小伙伴对  Druid、Doris、TiDB、ClickHouse、Kylin、Presto ....... 有接触的,不妨分享一些使用的心得体会~

大家一起交流讨论,大家一起进步嘛 😁 

一、基本概念

1.1 什么是Druid

Druid 是一个分布式的支持实时分析的数据存储系统(Data Store)。美国广告技术公司MetaMarkets 于2011 年创建了Druid 项目,并且于2012 年晚期开源了Druid 项目。Druid设计之初的想法就是为分析而生,它在处理数据的规模、数据处理的实时性方面,比传统的OLAP 系统有了显著的性能改进,而且拥抱主流的开源生态,包括Hadoop 等。多年以来,Druid 一直是非常活跃的开源项目。


Druid 的官方网站是 http://druid.io。

另外,阿里巴巴也曾创建过一个开源项目叫作Druid(简称阿里Druid),它是一个数据库连接池的项目。阿里Druid 和本问讨论的Druid 没有任何关系,它们解决完全不同的问题,因此大家不要把两者混淆了。

另外,我们还需要了解的一点:Druid是一个实时多维OLAP分析的数据处理系统。但是OLAP分析又分为关系型联机分析处理(ROLAP)、多维联机分析处理(MOLAP)两种。

简单来说MOLAP需要数据预计算好为一个多维数组,典型的方式是Cube。而ROLAP就是数据本身什么样,就是什么样,通过MPP提高分布式计算能力。

Druid 走的就是 MOLAP。可以快速(实时)访问大量的、很少变化的数据的系统。并被设计为,在面对代码部署、机器故障和生产系统的其他可能性问题时,依旧能 100% 地正常提供服务。

1.2 Druid 的三个设计原则

  • 快速查询(Fast Query):部分数据的聚合(Partial Aggregate)+内存化(In-emory)+索引(Index)。
  • 水平扩展能力(Horizontal Scalability):分布式数据(Distributed Data)+ 并行化查询(Parallelizable Query)。
  • 实时分析(Realtime Analytics):不可变的过去,只追加的未来(Immutable Past,Append-Only Future)。

1.2.1 快速查询(Fast Query)

对于数据分析场景,大部分情况下,我们只关心一定粒度聚合的数据,而非每一行原始数据的细节情况。因此,数据聚合粒度可以是1 分钟、5 分钟、1 小时或1 天等。部分数据聚合(Partial Aggregate)给Druid 争取了很大的性能优化空间。

数据内存化也是提高查询速度的杀手锏。内存和硬盘的访问速度相差近百倍,但内存的大小是非常有限的,因此在内存使用方面要精细设计,比如Druid 里面使用了Bitmap 和各种压缩技术。

另外,为了支持Drill-Down 某些维度,Druid 维护了一些倒排索引。这种方式可以加快AND和OR 等计算操作。

1.2.2 水平扩展能力(Horizontal Scalability)

Druid 查询性能在很大程度上依赖于内存的优化使用。数据可以分布在多个节点的内存中,因此当数据增长的时候,可以通过简单增加机器的方式进行扩容。为了保持平衡,Druid按照时间范围把聚合数据进行分区处理。对于高基数的维度,只按照时间切分有时候是不够的(Druid 的每个Segment 不超过2000 万行),故Druid 还支持对Segment 进一步分区。

历史Segment 数据可以保存在深度存储系统中,存储系统可以是本地磁盘、HDFS 或远程的云服务。如果某些节点出现故障,则可借助Zookeeper 协调其他节点重新构造数据。

Druid 的查询模块能够感知和处理集群的状态变化,查询总是在有效的集群架构中进行。集群上的查询可以进行灵活的水平扩展。

1.2.3 实时分析(Realtime Analytics)

Druid 提供了包含基于时间维度数据的存储服务,并且任何一行数据都是历史真实发生的事件,因此在设计之初就约定事件一但进入系统,就不能再改变。

对于历史数据Druid 以Segment 数据文件的方式组织,并且将它们存储到深度存储系统中,例如文件系统或亚马逊的S3 等。

当需要查询这些数据的时候,Druid 再从深度存储系统中将它们装载到内存供查询使用。

1.3 Druid 的主要特点

  1. 列式存储格式。Druid 使用面向列的存储,这意味着,它只需要加载特定查询所需要的列。这为只查看几列的查询提供了巨大的速度提升。此外,每列都针对其特定的数据类型进行优化,支持快速扫描和聚合。
  2. 可扩展的分布式系统。Druid 通常部署在数十到数百台服务器的集群中,并且提供数百万条/秒的摄取率,保留数百万条记录,以及亚秒级到几秒钟的查询延迟。
  3. 大规模的并行处理。Druid 可以在整个集群中进行大规模的并行查询。
  4. 实时或批量摄取。Druid 可以实时摄取数据(实时获取的数据可立即用于查询)或批量处理数据。
  5. 自愈,自平衡,易操作。集群扩展和缩小,只需添加或删除服务器,集群将在后台自动重新平衡,无需任何停机时间。
  6. 原生云、容错的架构,不会丢失数据。一旦Druid 吸收了您的数据,副本就安全地存储在深度存储中(通常是云存储、HDFS 或共享文件系统)。即使每个Druid 服务器都失败,也可以从深层存储恢复数据。对于仅影响少数Druid 服务器的更有限的故障,复制确保在系统恢复时仍然可以执行查询。
  7. 用于快速过滤的索引。Druid 使用CONCISE 或Roaring 压缩位图索引来创建索引,这些索引可以快速过滤和跨多个列搜索。
  8. 近似算法。Druid 包括用于近似计数、近似排序以及计算近似直方图和分位数的算法。这些算法提供了有限的内存使用,并且通常比精确计算快得多。对于准确度比速度更重要的情况,Druid 还提供精确的计数-明确和准确的排名。
  9. 插入数据时自动聚合。Druid 可选地支持摄取时的数据自动汇总。预先汇总了您的数据,并且可以导致巨大的成本节约和性能提升。

1.4 Druid 的应用场景

Druid 应用最多的是 类似于广告分析创业公司MetaMarkets 中的应用场景,如广告分析、互联网广告系统监控以及网络监控等

当业务中出现以下情况时,Druid 是一个很好的技术方案选择:

  • 需要交互式聚合和快速探究大量数据时;
  • 具有大量数据时,如每天数亿事件的新增、每天数10T 数据的增加;
  • 对数据尤其是大数据进行实时分析时;
  • 需要一个高可用、高容错、高性能数据库时。

目前从 Apache Druid 官网看到有 100 多家企业都在使用,也包括了很多国内的公司,例如 BAT、字节跳动、知乎、优酷、小米、Oppo、有赞、作业帮等等,大概占到 10% 左右。

1.4.1 主要应用

  • 点击流分析(Web 和 移动端分析)

  • 网络遥测分析(网络性能监控)

  • 服务器指标存储

  • 供应链分析(制造指标)

  • 应用程序性能指标

  • 数字营销 / 广告分析

  • 商业智能 / OLAP

1.4.2 适合场景

  • 插入率很高,但更新并不常见
  • 大多数查询都是聚合查询和 groupBy 分组查询,包括搜索和扫描查询
  • 查询响应时间为百毫秒~几秒钟之间
  • 时序数据
  • 可能有多个表,但是每个查询仅命中其中某一个表
  • 具有高基数数据列(例如 URL,用户 ID 等),并且需要对其进行快速计数和排序
  • 要从 Kafka,HDFS,本地文件或 Amazon S3、AliyunOSS 之类的对象存储中加载数据

Apache Druid 支持将 groupBy 查询的中间结果溢出存储至磁盘,以应对大查询导致的内存资源不足。通过 druid.query.groupBy.maxOnDiskStorage 配置项可以控制对应的磁盘空间大小,默认值为 0,表示不开启该特性。

1.4.3 不合适的场景

  • 使用主键对现有记录进行低延迟更新。Druid 支持流式插入,但不支持流式更新(使用后台批处理作业完成更新)
  • 正在构建脱机报告系统,此时查询延迟不是很重要
  • 需要进行大的联接查询(将一个大事实表连接到另一个大事实表),并且可以花很长时间来完成这些查询

二、原理与架构剖析

2.1 基本架构

Apache Druid 基本架构图

Druid 总体包含以下 5 类节点:

  1. 中间管理节点(middleManager node):及时摄入实时数据,已生成 Segment 数据文件。

MiddleManager 进程是执行提交的任务的工作节点。Middle Managers 将任务转发给在不同 JVM 中运行的 Peon进程(如此,可以做到资源和日志的隔离)。MiddleManager、Peon、Task 的对应关系是,每个 Peon 进程一次只能运行一个Task 任务,但一个 MiddleManager 却可以管理多个 Peon 进程。

  1. 历史节点(historical node):加载已生成好的数据文件,以供数据查询。historical 节点是整个集群查询性能的核心所在,因为 historical 会承担绝大部分的 segment 查询。

Historical 进程从 Deep Storage 中下载 Segment,并响应有关这些 Segment 的查询请求(这些请求来自Broker 进程)。另外,Historical 进程不处理写入请求 。Historical 进程采用了无共享架构设计,它知道如何去加载和删除 Segment,以及如何基于 Segment 来响应查询。因此,即便底层的 Deep Storage无法正常工作,Historical 进程还是能针对其已同步的 Segments,正常提供查询服务

  1. 查询节点(broker node):接收客户端查询请求,并将这些查询转发给 Historicals 和 MiddleManagers。当 Brokers 从这些子查询中收到结果时,它们会合并这些结果并将它们返回给调用者。

  2. 协调节点(coordinator node):主要负责历史节点的数据负载均衡,以及通过规则(Rule) 管理数据的生命周期。协调节点告诉历史节点加载新数据、卸载过期数据、复制数据、 和为了负载均衡移动数据。

Coordinator 是周期性运行的(由 druid.coordinator.period 配置指定,默认执行间隔为 60s)。因为需要评估集群的当前状态,才能决定应用哪种策略,所以,Coordinator 需要维护和 ZooKeeper 的连接,以获取集群的信息。而关于 Segment 和 Rule 的信息保存在了元数据库中,所以也需要维护与元数据库的连接。

  1. 统治者(overlord node):进程监视 MiddleManager 进程,并且是数据摄入 Druid 的控制器。他们负责将提取任务分配给 MiddleManagers 并协调 Segement 发布,包括接受、拆解、分配 Task,以及创建 Task 相关的锁,并返回 Task 的状态。大致流程如下:

除了上述五个节点之外,还有一个 Router 负责将请求路由到Broker, Coordinators和Overlords。

  • Router 进程可以在 Brokers、Overlords 和 Coordinators 进程之上,提供一层统一的 API网关。Router 进程本身是可选的,不过如果集群的数据规模已经达到了 TB级别,还是需要考虑启用的(druid.router.managementProxy.enabled=true)。因为一旦集群规模达到一定的数量级,那么发生故障的概率就会变得不容忽视,而 Router 支持将请求只发送给健康的节点,避免请求失败。同时,查询的响应时间和资源消耗,也会随着数据量的增长而变高,而 Router 支持设置查询的优先级和负载均衡策略,避免了大查询造成的队列堆积或查询热点等问题。
  • 另外,Router 节点还可用于将查询路由到不同的 Broker 节点,便于实现冷热分层,以更好地应对超大规模数据集。默认情况下,Router 会根据设置的 Rule 规则,来路由查询请求。例如,如果将最近 1 个月的数据加载到热集群中,则最近一个月内的查询可以路由到一组专用 Broker,超出该时间范围的查询将被路由到另一组 Broker,如此便实现了查询的冷热隔离。

Apache Druid Hot-Warm 如下图:

以上 所讲 Druid 的进程可以被任意部署,但是为了理解与部署组织方便。

这些进程分为了三类:

  • Master: Coordinator, Overload 负责数据可用性和摄取
  • Query: Broker and Router,负责处理外部请求
  • Data: Historical and MiddleManager,负责实际的Ingestion负载和数据存储

2.2 外部依赖

同时,Druid 还包含 3 类外部依赖:

  1. 数据文件存储库(Deep Storage):存放生成的 Segment 数据文件,并供历史服务器下载, 对于单节点集群可以是本地磁盘,而对于分布式集群一般是 HDFS。

Druid 仅将 Deep Storage 用作数据的备份,并将其作为在 Druid 进程之间在后台传输数据的一种方式。当接受到查询请求,Historical 进程不会从 Deep Storage 读取数据,而是在响应任何查询之前,读取从本地磁盘 pre-fetched 的 Segments。这意味着 Druid 在查询期间永远不需要访问 Deep Storage,从而极大地降低了查询延迟。这也意味着,必须保证 Deep Storage 和 Historical 进程所在节点,能拥有足够的磁盘空间。

  1. 元数据库(Metadata Storage),存储 Druid 集群的元数据信息,比如 Segment 的相关信息,一 般用 MySQL 或 PostgreSQL。

  2. Zookeeper:为 Druid 集群提供以执行协调服务。如内部服务的监控,协调和领导者选举。

涵盖了以下的几个主要特性:Coordinator 节点的 Leader 选举 Historical 节点发布 Segment 的协议 Coordinator 和 Historical 之间 load / drop Segment 的协议 Overlord 节点的 Leader 选举 Overlord 和 MiddleManager 之间的 Task 管理

2.3 架构演进

设计总图

Apache Druid 初始版本架构图 ~ 0.6.0(2012~2013)

初始版本架构图

0.7.0 ~ 0.12.0(2013~2018)

Apache Druid 旧架构图——数据流转


查询路径——红色箭头: ①客户端向Broker发起请求,Broker会将请求路由到 ②实时节点和 ③历史节点

Druid数据流转——黑色箭头:数据源包括实时流和批量数据 ④实时流经过索引直接写到实时节点 ⑤批量数据通过 IndexService 存储到 DeepStorage ⑥再由历史节点加载. ⑦实时节点也可以将数据转存到 DeepStorage

Apache Druid 旧架构图——集群管理

0.13.0 ~ 当前版本(2018~now)


从架构图中可以看出来 Apache Druid 集群的通讯是基于 Apache ZooKeeper 的。

2.4 Lambda 流式架构


通常流式数据的链路为 Raw data → Kafka → Stream processor(optional, typically for ETL) → Kafka(optional)→ Druid → Application / user,而批处理的链路为 Raw data → Kafka(optional)→ HDFS → ETL process(optional)→ Druid → Application / user

三、Apache Druid的数据结构

3.1 前言

与Druid 架构相辅相成的是其基于DataSource 与 Segment 的数据结构,它们共同成就了Druid 的高性能优势。

3.2 DataSource 结构

若与传统的关系型数据库管理系统( RDBMS)做比较,Druid 的DataSource 可以理解为 RDBMS 中的表(Table)。DataSource 的结构包含以下几个方面。

  1. 时间列( TimeStamp):表明每行数据的时间值,默认使用UTC 时间格式且精确到毫秒级别。这个列是数据聚合与范围查询的重要维度。
  2. 维度列(Dimension):维度来自于OLAP 的概念,用来标识数据行的各个类别信息,参与事件过滤。
  3. 指标列( Metric):指标对应于OLAP 概念中的Fact,是用于聚合和计算的列。这些指标列通常是一些数字,计算操作通常包括Count、Sum 和Mean 等。
  • DataSource 结构 结构如图所示:

无论是实时数据消费还是批量数据处理, Druid 在基于DataSource 结构存储数据时即可选择对任意的指标列进行聚合( RollUp)操作。该聚合操作主要基于维度列与时间范围两方面的情况。

Druid读取数据的入口并不会直接存储原始数据, 而是使用Roll-up这种first-level聚合操作压缩原始数据。

  • 下图显示的是执行聚合操作后DataSource 的数据情况。

相对于其他时序数据库, Druid 在数据存储时便可对数据进行聚合操作是其一大特点,该特点使得Druid 不仅能够节省存储空间,而且能够提高聚合查询的效率。

用SQL表示类似于对时间撮和所有维度列进行分组,并以原始的指标列做常用的聚合操作。

GROUP BY timestamp, publisher, advertiser, gender, country
  :: impressions = COUNT(1),  clicks = SUM(click),  revenue = SUM(price)

为什么不存原始数据? 因为原始数据量可能非常大,对于广告的场景,一秒钟的点击数是以千万计数。如果能够在读取数据的同时就进行一点聚合运算,就可以大大减少数据量的存储,这种方式的缺点是不能查询单条事件,也就是你无法查到每条事件具体的click和price值了。

由于后面的查询都将以上面的查询为基础,所以Roll-up的结果一定要能满足查询的需求。通常count和sum就足够了,因此Rollup的粒度是你能查询的数据的最小时间单位。假设每隔1秒Rollup一次,后面的查询你最小只能以一秒为单位,不能查询一毫秒的事件,默认的粒度单位是ms。

3.2.1 数据结构(维度列)

维度列因为要支持过滤和分组,每一个维度列的数据结构包含了三部分:

  • 值到ID的Map映射
  • 列的值列表,存储的是上一步对应的ID
  • 倒排索引

示例进行说明:

代表这一维度列的数据结构如下:

1: Dictionary that encodes column values
  {
    "Justin Bieber": 0,
    "Ke$ha":         1
  }

2: Column data
  [0,
   0,
   1,
   1]

3: Bitmaps - one for each unique value of the column
  value="Justin Bieber": [1,1,0,0]
  value="Ke$ha":         [0,0,1,1]

注意:在最坏情况下前面两种会随着数据量的大小而线性增长. 而BitMap的大小则等于数据量大小 * 列的个数。

3.2.2 结构说明

字典表的key都是唯一的,所以Map的key是unique的column value,Map的value从0开始不断增加。示例数据的page列只有两个不同的值。所以为Bieber编号0,Ke$ha编号为1。

Key            |Value
---------------|-----
Justin Bieber  |0
Ke$ha          |1

列的数据: 要保存的是每一行中这一列的值, 值是ID而不是原始的值。因为有了上面的Map字典,所以有下面的对应关系,这样列的值列表直接取最后一列: [0,0,1,1]。

rowNum  page                ID
1       Justin Bieber       0
2       Justin Bieber       0
3       Ke$ha               1 
4       Ke$ha               1

BitMap的key是第一步Map的key(列的原始值)。value数组的每个元素表示指定列的某一行是否包含/存在/等于当前key。注意:BitMap保存的value数组只有两个值:1和0,1表示这一行包含或等于BitMap的key, 0表示不存在/不包含/不等于,如下:

第一行的page列值为Justin Bieber/列值为Justin Bieber的在第一行里
                        ^
                        |
value="Justin Bieber": [1,1,0,0]
value="Ke$ha":         [0,0,1,1]
                        ^
                        |
             第一行的page列值不是Ke$ha

这种存储方式,如果unique重复的列很少,比如page列的每一个值都是不同的。BitMap就会是一个稀疏矩阵。

A: [1,0,0,0,0,0,0,0,0,0,0]
B: [0,1,0,0,0,0,0,0,0,0,0]
C: [0,0,1,0,0,0,0,0,0,0,0]
D: [0,0,0,1,0,0,0,0,0,0,0]
E: [0,0,0,0,1,0,0,0,0,0,0]
  • unique的重复数量很少也叫做high cardinality,表示基数很高,不同列的数量很多,列值相同的记录数很少。

  • 稀疏矩阵对于BitMap而言却是有优点的,因为越是稀疏,它可以被压缩的比例越大,最后存储的空间越少(相对原始数据)。

  • 上面只是针对page列的BitMap,对于其他的维度列,都有自己的BitMap!即每一个维度列都有一个BitMap。

3.3 Segment 结构

DataSource 是一个逻辑概念, Segment 却是数据的实际物理存储格式, Druid 正是通过Segment 实现了对数据的 横纵向切割( Slice and Dice)操作。从数据按时间分布的角度来看,通过参数segmentGranularity 的设置,Druid 将不同时间范围内的数据存储在不同的Segment 数据块中,这便是所谓的数据横向切割。

这种设计为Druid 带来一个显而易见的优点:按时间范围查询数据时,仅需要访问对应时间段内的这些Segment 数据块,而不需要进行全表数据范围查询,这使效率得到了极大的提高。

Druid的分片是Segment文件。Druid首先总是以时间戳进行分片,因为事件数据总是有时间戳。假设以小时为粒度创建下面的两个Segment文件


通过Segment 将数据按时间范围存储,同时,在Segment 中也面向列进行数据压缩存储,这便是所谓的数据纵向切割。而且在Segment 中使用了Bitmap 等技术对数据的访问进行了优化。
  • 在几乎所有的NoSQL中都有数据分片的概念:比如ES的分片,HBase的Region,都表示的是数据的存储介质。为什么要进行分片,因为数据大了,不能都存成一个大文件吧,所以要拆分成小文件以便于快速查询,伴随拆分通常都有合并小文件。

  • 从Segment文件的名称可以看出它包含的数据一定是在文件名称对应的起始和结束时间间隔之内的。

  • Segment文件名称的格式:dataSource_interval_version_partitionNumber最后一个分区号是当同一个时间戳下数据量超过阈值要分成多个分区

    • 分片和分区都表示将数据进行切分。分片是将不同时间戳分布在不同的文件中,而分区是相同时间戳放不下了,分成多个分区
    • 巧合的是Kafka中也有Segment和Partition的概念。Kafka的Partition是topic物理上的分组,一个topic可以分为多个partition,它的partition物理上由多个segment组成。即Partition包含Segment,而Druid是Segment包含Partition。

3.3.1 学习小结

数据进入到Druid首先会 进行索引,这给予了Druid一个机会可以进行分析数据,添加索引结构、压缩、为查询优化调整存储结构

  1. 转换为列式结构
  2. 使用BitMap索引
  3. 使用不同的压缩算法
  • 索引的结果是生成Segment文件,Segment中除了保存不同的维度和指标,还保存了这些列的索引信息。

  • Druid将索引数据保存到Segment文件中,Segment文件根据时间进行分片。最基本的设置中,每一个时间间隔都会创建一个Segment文件。

  • 这个时间间隔的长度配置在granularitySpec的segmentGranularity参数。为了Druid工作良好,通常Segment文件大小为300-700M。

  • 前面Roll-up时也有一个时间粒度:queryGranularity指的是在读取时就进行聚合,segmentGranularity则是用于分片进来之后的数据。

四、部署和配置

4.1 单机版

4.1.1 Jar 包下载

从 https://imply.io/get-started 下载最新版本安装包

4.1.2 Druid 的安装部署

说明:imply 集成了Druid,提供了Druid 从部署到配置到各种可视化工具的完整的解决方案, imply 有点类似Cloudera Manager。

1.解压

tar -zxvf imply-2.7.10.tar.gz -C /opt/module

目录说明如下:

  • bin/ - run scripts for included software.
  • conf/ - template configurations for a clustered setup.
  • conf-quickstart/* - configurations for the single-machine quickstart.
  • dist/ - all included software.
  • quickstart/ - files related to the single-machine quickstart.

2.修改配置文件

1)修改Druid 的ZK 配置

[chris@hadoop102 _common]$ pwd
/opt/module/imply/conf/druid/_common
[chris@hadoop102 _common]$ vi common.runtime.properties
druid.zk.service.host=hadoop102:2181,hadoop103:2181,hadoop104:218
1

2)修改启动命令参数,使其不校验不启动内置ZK

[chris@hadoop102 supervise]$ pwd
/opt/module/imply/conf/supervise
:verify bin/verify-java
#:verify bin/verify-default-ports
#:verify bin/verify-version-check
:kill-timeout 10
#!p10 zk bin/run-zk conf-quickstart

3.启动

1)启动zookeeper

2)启动imply

[chris@hadoop102 imply]$ bin/supervise -c
conf/supervise/quickstart.conf

说明:每启动一个服务均会打印出一条日志。可以通过/opt/module/imply-2.7.10/var/sv/查看服务启动时的日志信息

3)查看端口号9095 的启动情况

[chris@hadoop102 ~]$ netstat -anp | grep 9095
tcp 0 0 :::9095 :::*
LISTEN 3930/imply-ui-linux
tcp 0
0 ::ffff:192.168.1.102:9095 ::ffff:192.168.1.1:52567
ESTABLISHED 3930/imply-ui-linux
tcp 0
0 ::ffff:192.168.1.102:9095 ::ffff:192.168.1.1:52568
ESTABLISHED 3930/imply-ui-linux

4.登录hadoop102:9095 查看


5.停止服务

按Ctrl + c 中断监督进程, 如果想中断服务后进行干净的启动, 请删除/opt/module/imply-2.7.10/var/目录。

五、通过 Imply 快速入门

5.1 在线加载样本数据

5.1.1 打开imply

基于前面单机版的配置,接下来基于 Imply 我们完成今天的入门Demo。

5.1.2 开始连接到实例Wikipedia 数据集


5.1.3 加载样本数据

Wikipedia 示例使用Http 数据加载器从URI 路径读取数据,格式为json。可以通过点击采样并继续,对文件前几行的数据进行采样,以确保它是可解析的数据。


5.1.4 配置汇总


5.1.5 配置时间戳和分区


5.1.6 配置要加载的列


5.1.7 确认并开始摄取

一旦加载器指示数据已被索引,就可以继续下一部分来定义数据立方体并开始可视化数据。

5.2 离线加载样本数据

如果无法访问公共Web 服务器,则可以从本地文件加载相同的数据集。该quickstart 目录包括一个样本数据集和一个摄取规范来处理数据, 分别命名 wikipedia-2016-06-27-sampled.jsonwikipedia-index.json

要为此摄取规范向Druid 提交索引作业,请从Imply 目录运行以下命令:

bin/post-index-task --file quickstart/wikipedia-index.json

成功运行将生成类似于以下内容的日志:

Beginning indexing data for wikipedia
Task started: index_wikipedia_2017-12-05T03:22:28.612Z
Task log:
http://localhost:8090/druid/indexer/v1/task/index_wikipedia_2017-
12-05T03:22:28.612Z/log
Task status:
http://localhost:8090/druid/indexer/v1/task/index_wikipedia_2017-
12-05T03:22:28.612Z/status
Task index_wikipedia_2017-12-05T03:22:28.612Z still running...
Task index_wikipedia_2017-12-05T03:22:28.612Z still running...
Task finished with status: SUCCESS
Completed indexing data for wikipedia. Now loading indexed data onto
the cluster...
wikipedia is 0.0% finished loading...
wikipedia is 0.0% finished loading...
wikipedia is 0.0% finished loading...
wikipedia loading complete! You may now query your data

5.3 创建数据立方体

通过单击顶部栏上的相应按钮切换到Imply 的“ 可视化”部分。从这里,您可以创建数据立方体来建模数据,浏览这些立方体,并将视图组织到仪表板中。首先单击+创建新数据多维数据集。

在出现的对话框中,确保wikipedia 选中此源并选择自动填充尺寸和度量。单击下一步继续:创建数据立方体。

5.4 可视化数据立方体

单击“ 保存”后,将自动加载此新数据多维数据集的数据立方体视图。将来,还可以通过从“ 可视化”屏幕单击数据立方体的名称(在此示例中为“Wikipedia”)来加载此视图。


在这里,可以通过过滤并在任何维度上拆分数据集来探索数据集。对于数据的每次过滤拆分,将看到所选度量的总值。

例如,在维基百科数据集上,通过在page 上拆分和按事件数排序查看最常编辑的page)。


数据立方体视图根据分割数据的方式建议不同的可视化。如果拆分字符串列,则数据最初将显示为表格。如果按时间拆分,数据立方体视图将推荐时间序列图,如果在数字列上拆分,则会到条形图。


5.5 运行SQL

访问SQL 编辑器。

SELECT page, COUNT(*) AS Edits
FROM wikipedia
WHERE "__time" BETWEEN TIMESTAMP '2016-06-27 00:00:00' AND TIMESTAMP
'2016-06-28 00:00:00'
GROUP BY page
ORDER BY Edits
DESC LIMIT 5

应该看到如下结果:


六、Apache Druid 数据摄入

6.1 数据格式

  1. 摄入规范化数据:JSON、CSV、TSV
  2. 自定义格式
  3. 其他格式

6.2 配置

主要是摄入的规则 Ingestion Spec

Ingestion Spec(数据格式描述)是Druid对要索引数据的格式以及如何索引该数据格式的一个统一描述,它是一个JSON文件,一般由三部分组成。

FieldTypeDescriptionRequired
dataSchemaJSON Object标识摄入数据的schema,dataSchema 是固定的,不随数据消费方式改变。不同specs 可共享  。yes
ioConfigJSON Object标识data 从哪来,到哪去。数据消费方式不同,ioConfig也不相同。yes
tuningConfigJSON Object标识如何调优不同的ingestion parameters 。根据不同的 ingestion method 不同。no
{
"dataSchema" : {...},
"ioConfig" : {...},
"tuningConfig" : {...}
}

6.2.1 DataSchema

第一部分的dataSchema描述了数据的格式,如何解析该数据,典型结构如下。

{
    "dataSource": <name_of_dataSource>,
    "parser": {
        "type": <>,
        "parseSpec": {
            "format": <>,
            "timestampSpec": {},
            "dimensionsSpec": {}
        }
    },
    "metricsSpec": {},
    "granularitySpec": {}
}
FieldTypeDescriptionRequired
dataSourceString要摄入的datasource 名称,Datasources 可看做为表yes
parserJSON Objectingested data 如何解析yes
metricsSpecJSON Object arrayaggregators(聚合器) 器列表yes
granularitySpecJSON Object数据聚合设置,指定segment 的存储粒度和查询粒度yes
6.2.1.1 parser

parser部分决定了数据如何被正确地解析,metricsSpec定义了数据如何被聚集计算,granularitySpec定义了数据分片的粒度、查询的粒度。

对于parser,type有两个选项:string和hadoopString,后者用于Hadoop索引的 job。parseSpec是数据格式解析的具体定义。

(1)string parser

FieldTypeDescriptionRequired
typeString一般为string,或在Hadoop indexing job 中使用hadoopyStringno
parseSpecJSON Object标识格式format 和、imestamp、dimensionsyes

parseSpec 两个功能:

  • String Parser 用parseSpec 判定将要处理rows 的数据格式( JSON, CSV, TSV)
  • 所有的Parsers 用parseSpec 判定将要处理rows 的timestamp 和dimensionsAll

JSON ParseSpec

FieldTypeDescriptionRequired
formatStringjsonno
timestampSpecJSON Object指明时间戳列名和格式yes
dimensionsSpecJSON Object指明维度的设置yes
flattenSpecJSON Object若json 有嵌套层级,则需要指定no

CSV ParseSpec

FieldTypeDescriptionRequired
formatStringcsv.yes
timestampSpecJSON Object指明时间戳列名和格式yes
dimensionsSpecJSON Object指明维度的设置yes
listDelimiterString多值dimensions 的分割符no(default = ctrl+A)
columnsJSON arraycsv 的数据列名yes

TSV ParseSpec

FieldTypeDescriptionRequired
formatStringtsv.yes
timestampSpecJSON Object指明时间戳列名和格式yes
dimensionsSpecJSON Object指明维度的设置yes
listDelimiterString多值dimensions 的分割符no(default = ctrl+A)
columnsJSON arraytsv 的数据列名yes
delimiterString数据之间的分隔符,默认是\tno

对于不同的数据格式,可能还有额外的parseSpec选项。

TimestampSpec

FieldTypeDescriptionRequired
columnStringtimestamp 的列yes
formatStringiso, millis, posix, auto or Joda time,时间戳格式no (default = 'auto')

DimensionsSpec

FieldTypeDescriptionRequired
dimensionsJSON数组描述数据包含哪些维度。每个维度可以只是个字符串,或者可以额外指明维度的属性,例如 “dimensions”: [ “dimenssion1”, “dimenssion2”, “{“type”: “long”, “name”: “dimenssion3”} ],默认是string类型。yes
dimensionExclusionsJSON字符串数组数据消费时要剔除的维度。no (default == [])
spatialDimensionsJSON对象数组空间维度名列表,主要用于地理几何运算no (default == [])
6.2.1.2  metricsSpec

metricsSpec是一个JSON对象数组,定义了一些聚合器(aggregators)。聚合器通常有如下的结构。

{
    "type": <type>,
    "name": <output_name>,
    "fieldName": <metric_name>
}
FieldTypeDescriptionRequired
dimensionsStringcount,longSum 等聚合函数类型yes
fieldNameString聚合函数运用的列名no
nameString聚合后指标的列名yes

一些简单的聚合函数:

count 、longSum、longMin、longMax、doubleSum、doubleMin、doubleMax

6.2.1.3 GranularitySpec

聚合支持两种聚合方式:uniform和arbitrary,前者以一个固定的时间间隔聚合数据,后者尽量保证每个segments大小一致,时间间隔是不固定的。目前uniform是默认选项。

FieldTypeDescriptionRequired
typeStringuniformyes
segmentGranularitystringsegment 的存储粒度,HOUR DAY 等yes
queryGranularitystring最小查询粒度MINUTE HOURyes
intervalsJSON Object array数据消费时间间隔 ,可选,对于流式数据 pull 方式而言可以忽略no
"dataSchema" : {
 "dataSource" : "wikipedia",
 "parser" : {
  "type" : "string",
  "parseSpec" : {
   "format" : "json",
   "dimensionsSpec" : {
    "dimensions" : [
     "channel",
     "cityName",
     "comment",
     "countryIsoCode",
     "countryName",
     "isAnonymous",
     "isMinor",
     "isNew",
     "isRobot",
     "isUnpatrolled",
     "metroCode",
     "namespace",
     "page",
     "regionIsoCode",
     "regionName",
     "user",
     { "name" : "commentLength", "type" : "long" },
     { "name" : "deltaBucket", "type" : "long" },
     "flags",
     "diffUrl",
     { "name": "added", "type": "long" },
     { "name": "deleted", "type": "long" },
     { "name": "delta", "type": "long" }
    ]
   },
   "timestampSpec": {
    "column": "timestamp",
    "format": "iso"
   }
  }
 },
 "metricsSpec" : [],
 "granularitySpec" : {
  "type" : "uniform",
  "segmentGranularity" : "day",
  "queryGranularity" : "none",
  "intervals" : ["2016-06-27/2016-06-28"],
  "rollup" : false
 }
}

6.2.2 ioConfig

ioConfig 指明了真正具体的数据源

FieldTypeDescriptionRequired
typeStringalways be 'realtime'.yes
firehoseJSON Object指明数据源,例如本地文件 kafkayes
plumberJSON ObjectWhere the data is going.yes

不同的firehose 的格式不太一致,以kafka 为例:

{
 firehose : {
  consumerProps : {
   auto.commit.enable : false
   auto.offset.reset : largest
   fetch.message.max.bytes : 1048586
   group.id : druid-example
   zookeeper.connect : localhost:2181
    zookeeper.connect.timeout.ms : 15000
    zookeeper.session.timeout.ms : 15000
    zookeeper.sync.time.ms : 5000
  },
  feed : wikipedia
  type : kafka-0.8
 }
}

ioConfig 的案例:

"ioConfig" : {
 "type" : "index",
 "firehose" : {
  "type" : "local",
  "baseDir" : "quickstart/",
  "filter" : "wikipedia-2016-06-27-sampled.json"
 },
 "appendToExisting" : false
}

6.2.3 tuningConfig

tuningConfig 这部分配置是优化数据输入的过程

FieldTypeDescriptionRequired
typeStringrealtimeno
maxRowsInMemoryInteger在存盘之前内存中最大的存储行数,指的是聚合后的行数indexing 所需Maximum heap memory= maxRowsInMemory * (2 +maxPendingPersists).no (default ==75000)
windowPeriodISO 8601 Period String默认10 分钟,最大可容忍时间窗口,超过窗口,数据丢弃no (default ==PT10m)
intermediatePersistPeriodISO 8601 Period String多长时间数据临时存盘一次no (default ==PT10m)
basePersistDirectoryString临时存盘目录no (default == javatmp dir)
versioningPolicyObject如何为segment 设置版本号no (default == basedon segment start time)
rejectionPolicyObject数据丢弃策略no (default =='serverTime')
maxPendingPersistsInteger最大同时存盘请求数,达到上限,输入将会暂停no (default == 0)
shardSpecObject分片设置no (default =='NoneShardSpec')
buildV9DirectlyBoolean是否直接构建V9 版本的索引no (default == true)
persistThreadPriorityint存盘线程优先级no (default == 0)
mergeThreadPriorityint存盘归并线程优先级no (default == 0)
reportParseExceptionsBoolean是否汇报数据解析错误no (default == false)
"tuningConfig" : {
 "type" : "index",
 "targetPartitionSize" : 5000000,
 "maxRowsInMemory" : 25000,
 "forceExtendableShardSpecs" : true
}

6.3 从 Hadoop 加载数据

6.3.1 加载数据

批量摄取维基百科样本数据,文件位于quickstart/wikipedia-2016-06-27-sampled.json。使用quickstart/wikipedia-index-hadoop.json 摄取任务文件。

bin/post-index-task --file quickstart/wikipedia-index-hadoop.json

此命令将启动Druid Hadoop 摄取任务。

摄取任务完成后,数据将由历史节点加载,并可在一两分钟内进行查询。

6.3.2 查询数据


6.4 从 kafka 加载数据

6.4.1 准备kafka

  1. 启动kafka
[chris@hadoop102 kafka]$ bin/kafka-server-start.sh config/server.properties
[chris@hadoop103 kafka]$ bin/kafka-server-start.sh config/server.properties
[chris@hadoop104 kafka]$ bin/kafka-server-start.sh config/server.properties
  1. 创建wikipedia 主题
[chris@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181 –topic
wikipedia --partitions 1 --replication-factor 1 –create
Created topic "wikipedia".
  1. 查看主题是否创建成功
[chris@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181 --list
__consumer_offsets
first
wikipedia

6.4.2 启动索引服务

我们将使用Druid 的Kafka 索引服务从我们新创建的维基百科主题中提取消息。要启动该服务,我们需要通过从Imply 目录运行以下命令向Druid 的overlord 提交supervisor spec

[chris@hadoop102 imply-2.7.10]$ curl -XPOST -H'Content-Type: application/json' -d
@quickstart/wikipedia-kafka-supervisor.json
http://hadoop102:8090/druid/indexer/v1/supervisor

说明:curl 是一个利用 URL 规则在命令行下工作的文件传输工具。它支持文件的上传和下载,所以是综合传输工具。

  • -X 为HTTP 数据包指定一个方法,比如PUT、DELETE。默认的方法是GET 6.4.3
  • -H 为HTTP 数据包指定 Header 字段内容
  • -d 为POST 数据包指定要向HTTP 服务器发送的数据并发送出去,如果<data>的内容以符号@ 开头,其后的字符串将被解析为文件名,curl 命令会从这个文件中读取数据发送。

6.4.3 加载历史数据

启动kafka 生产者生产数据

[chris@hadoop102 kafka]$ bin/kafka-console-producer.sh --broker-list hadoop102:9092 –
-topic wikipedia < /opt/module/imply-2.7.10/quickstart/wikipedia-2016-06-27-sampled.json

说明:< 将文件作为命令输入 可在kafka 本地看到相应的数据生成

[chris@hadoop103 logs]$ pwd
/opt/module/kafka/logs

将样本事件发布到Kafka 的wikipedia 主题,然后由Kafka 索引服务将其提取到Druid 中。你现在准备运行一些查询了!

6.4.4 加载实时数据

下载一个帮助应用程序,该应用程序将解析维基媒体的IRC 提要中的event,并将这些event发布到我们之前设置的Kafka 的wikipedia 主题中。

[chris@hadoop102 imply-2.7.10]$ curl -O
https://static.imply.io/quickstart/wikiticker-0.4.tar.gz

说明:

-O 在本地保存获取的数据时,使用它们在远程服务器上的文件名进行保存。

[chris@hadoop102 imply-2.7.10]$ tar -zxvf wikiticker-0.4.tar.gz
[chris@hadoop102 imply-2.7.10]$ cd wikiticker-0.4

现在运行带有参数的wikiticker,指示它将输出写入我们的Kafka 主题:

[chris@hadoop102 wikiticker-0.4]$ bin/wikiticker -J-Dfile.encoding=UTF-8 -out kafka –
topic Wikipedia

查询多次,对比结果的变化。


6.4.5 加载自定义kafka 主题数据

可以通过编写自定义 supervisor spec 来加载自己的数据集。

要自定义受监督的 Kafka 索引服务提取,可以将包含的 quickstart/wikipedia-kafka-supervisor.json 规范复制到自己的文件中,根据需要进行编辑,并根据需要创建或关闭管理程序。没有必要自己重启 Imply 或Druid 服务。

七、Apache Druid 核心插件 Kafka Indexing Service & SLS Indexing Service

7.1 前言

Kafka Indexing Service 是 Apache Druid 推出的使用 Apache Druid 的 Indexing Service 服务实时消费 Kafka 数据的插件。

Kafka Indexing Service 可以在 Overlord 上配置 Supervisor(这里的监管者具体是指 KafkaSupervisor,负责监控单个 DataSource 下的 KafkaIndexTask

在其构造的时候,可以接受 KafkaSupervisorSpec 以知晓 Kafka 的 Topic 相关的配置信息,以及摄入的规则,用于生成 KafkaIndexTask 索引任务),并负责管理 Kafka 索引任务的创建和生命周期。

这些 KIS 任务使用 Kafka 自身的分区和偏移机制来读取事件,因此能够提供 exactly-once 摄取的保证(旧版本下,Tranquility 采用的是 push 的方式,则完全无法实现不丢不重的特性)。

KIS 任务还能够从 Kafka 读取非近期事件,并且不受其他摄取机制强加的窗口期限的影响。另外,Supervisor 会监控索引任务的状态,以便管理故障,并保证了可伸缩性和易复制的特性。更多差异点,详见下面的对比表:


在 0.16.0 版本中,Apache Druid 彻底删除了 Realtime Node 相关的插件,包括了 druid-kafka-eight、druid-kafka-eight-simpleConsumer、druid-rabbitmq 和 druid-rocketmq

虽然新引入的 KIS 有诸多好处,但是世上并不存在“银弹”。

因为 KIS 采用了 pull 的方式摄入数据,必然会存在拉取的频率一说。该频率由 offsetFetchPeriod 参数控制,默认 30s 会拉取一次,而最快只能 5s 拉取一次。那为什么不能设置更小的值呢?因为如果过于频繁地向 Kafka 发起请求,可能影响到 Kafka 的稳定性。

补充:上文我们也讲到该插件会在 Overlord 中启动一个 supervisor,supervisor 启动之后会在 Middlemanager 中启动一些 indexing task,这些 task 会连接到 Kafka 集群消费 topic 数据,并完成索引创建。

  1. task 创建和运行的过程

  1. task停止的过程

7.2 与 Kafka 集群交互

E-MapReduce Druid 集群与 Kafka 集群交互的配置方式与 Hadoop 集群类似,均需要设置连通性、hosts 等。

对于非安全 Kafka 集群,请按照以下步骤操作:

  1. 确保集群间能够通信(两个集群在一个安全组下,或两个集群在不同安全组,但两个安全组之间配置了访问规则)。
  2. 将 Kafka 集群的 hosts 写入到 E-MapReduce Druid 集群每一个节点的 hosts 列表中。

注意 Kafka 集群的 hostname 应采用长名形式,例如 emr-header-1.cluster-xxxxxxxx。

对于安全 Kafka 集群,您需要执行下列操作(前两步与非安全 Kafka 集群相同):

  1. 确保集群间能够通信(两个集群在一个安全组下,或两个集群在不同安全组,但两个安全组之间配置了访问规则)。
  2. 将 Kafka 集群的 hosts 写入到 E-MapReduce Druid 集群每一个节点的 hosts 列表中。

注意 Kafka 集群的 hostname 应采用长名形式,如 emr-header-1.cluster-xxxxxxxx。

  1. 设置两个集群间的 Kerberos 跨域互信(详情请参见 跨域互信 ),推荐做双向互信。
  2. 准备一个客户端安全配置文件,文件内容格式如下。
KafkaClient {
      com.sun.security.auth.module.Krb5LoginModule required
      useKeyTab=true
      storeKey=true
      keyTab="/etc/ecm/druid-conf/druid.keytab"
      principal="druid@EMR.1234.COM";
  };

文件准备好后,将该配置文件同步到 E-MapReduce Druid 集群的所有节点上,放置于某一个目录下面(例如/tmp/kafka/kafka_client_jaas.conf)。

  1. 在 E-MapReduce Druid 配置页面的 overlord.jvm 中新增如下选项。
Djava.security.auth.login.config=/tmp/kafka/kafka_client_jaas.conf
  1. 在 E-MapReduce Druid 配置页面的 middleManager.runtime 中配置druid.indexer.runner.javaOpts=-Djava.security.auth.login.confi=/tmp/kafka/kafka_client_jaas.conf和其他 JVM 启动参数。
  2. 重启 Druid 服务。

7.3 使用 Apache Druid Kafka Indexing Service 实时消费 Kafka 数据

  1. 在 Kafka 集群(或 Gateway)上执行以下命令创建一个名称为 metrics 的 topic。
-- 如果开启了 Kafka 高安全:
 export KAFKA_OPTS="-Djava.security.auth.login.config=/etc/ecm/kafka-conf/kafka_client_jaas.conf"
 --
 kafka-topics.sh --create --zookeeper emr-header-1:2181,emr-header-2,emr-header-3/kafka-1.0.0 --partitions 1 --replication-factor 1 --topic metrics

实际创建 topic 时,您需要根据您的环境配置来替换上述命令中的各个参数。其中,--zookeeper 参数中 /kafka-1.0.0 是一个路径,该路径的获取方法是:登录阿里云 E-MapReduce 控制台> 进入 Kafka 集群的 Kafka 服务配置页面 > 查看 zookeeper.connect 配置项的值。如果您的 Kafka 集群是自建集群,则您需要根据集群的实际配置来替换 --zookeeper 参数。

  1. 定义数据源的数据格式描述文件(名称命名为 metrics-kafka.json),并放置在当前目录下(或放置在其他您指定的目录上)。
{
     "type": "kafka",
     "dataSchema": {
         "dataSource": "metrics-kafka",
         "parser": {
             "type": "string",
             "parseSpec": {
                 "timestampSpec": {
                     "column": "time",
                     "format": "auto"
                 },
                 "dimensionsSpec": {
                     "dimensions": ["url", "user"]
                 },
                 "format": "json"
             }
         },
         "granularitySpec": {
             "type": "uniform",
             "segmentGranularity": "hour",
             "queryGranularity": "none"
         },
         "metricsSpec": [{
                 "type": "count",
                 "name": "views"
             },
             {
                 "name": "latencyMs",
                 "type": "doubleSum",
                 "fieldName": "latencyMs"
             }
         ]
     },
     "ioConfig": {
         "topic": "metrics",
         "consumerProperties": {
             "bootstrap.servers": "emr-worker-1.cluster-xxxxxxxx:9092(您 Kafka 集群的 bootstrap.servers)",
             "group.id": "kafka-indexing-service",
             "security.protocol": "SASL_PLAINTEXT",
             "sasl.mechanism": "GSSAPI"
         },
         "taskCount": 1,
         "replicas": 1,
         "taskDuration": "PT1H"
     },
     "tuningConfig": {
         "type": "kafka",
         "maxRowsInMemory": "100000"
     }
 }

说明 ioConfig.consumerProperties.security.protocol 和 ioConfig.consumerProperties.sasl.mechanism 为安全相关选项(非安全 Kafka 集群不需要)。

  1. 执行下述命令添加 Kafka supervisor。
curl --negotiate -u:druid -b ~/cookies -c ~/cookies -XPOST -H 'Content-Type: application/json' -d @metrics-kafka.json http://emr-header-1.cluster-1234:18090/druid/indexer/v1/supervisor

其中 --negotiate-u-b-c 等是针对安全 E-MapReduce Druid 集群的选项。

  1. 在 Kafka 集群上开启一个 console producer。
-- 如果开启了 Kafka 高安全:
 export KAFKA_OPTS="-Djava.security.auth.login.config=/etc/ecm/kafka-conf/kafka_client_jaas.conf"
 echo -e "security.protocol=SASL_PLAINTEXT\nsasl.mechanism=GSSAPI" > /tmp/Kafka/producer.conf
 --
 Kafka-console-producer.sh --producer.config /tmp/kafka/producer.conf --broker-list emr-worker-1:9092,emr-worker-2:9092,emr-worker-3:9092 --topic metrics
 >

其中 --producer.config /tmp/Kafka/producer.conf 是针对安全 Kafka 集群的选项。

  1. 在 kafka_console_producer 的命令提示符下输入一些数据。
{"time": "2018-03-06T09:57:58Z", "url": "/foo/bar", "user": "alice", "latencyMs": 32}
{"time": "2018-03-06T09:57:59Z", "url": "/", "user": "bob", "latencyMs": 11}
{"time": "2018-03-06T09:58:00Z", "url": "/foo/bar", "user": "bob", "latencyMs": 45}

其中时间戳可用如下 python 命令生成:

python -c 'import datetime; print(datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ"))'
  1. 准备一个查询文件,命名为 metrics-search.json。
{
     "queryType" : "search",
     "dataSource" : "metrics-kafka",
     "intervals" : ["2018-03-02T00:00:00.000/2018-03-08T00:00:00.000"],
     "granularity" : "all",
     "searchDimensions": [
         "url",
         "user"
     ],
     "query": {
         "type": "insensitive_contains",
         "value": "bob"
     }
 }
  1. 在 E-MapReduce Druid 集群 Master 上执行查询。
curl --negotiate -u:Druid -b ~/cookies -c ~/cookies -XPOST -H 'Content-Type: application/json' -d @metrics-search.json http://emr-header-1.cluster-1234:18082/druid/v2/?pretty

其中--negotiate-u-b-c 等是针对安全 E-MapReduce Druid 集群的选项。

正常返回结果示例:

[ {
   "timestamp" : "2018-03-06T09:00:00.000Z",
   "result" : [ {
     "dimension" : "user",
     "value" : "bob",
     "count" : 2
   } ]
 } ]

7.4 关于 SLS Indexing Service

SLS Indexing Service 是 E-MapReduce 推出的一个 Druid 插件,用于从 SLS 消费数据。

7.4.1 背景介绍

SLS Indexing Service 消费原理与 Kafka Indexing Service 类似,因此也支持 Kafka Indexing Service 一样的 Exactly-Once 语义。其综合了 SLS 与 Kafka Indexing Service 两个服务的优点:

  • 极为便捷的数据采集,可以利用 SLS 的多种数据采集方式实时将数据导入 SLS。
  • 不用额外维护一个 Kafka 集群,省去了数据流的一个环节。
  • 支持 Exactly-Once 语义。
  • 消费作业高可靠保证,作业失败重试,集群重启/升级业务无感知等。

7.4.2 准备工作

  • 如果还没有开通 SLS 服务,请先开通 SLS 服务,并配置好相应的 Project 和 Logstore。
  • 准备好以下配置项内容:
    • SLS 服务的 endpoint(注意要用内网服务入口)
    • 可访问 SLS 服务的 AccessKeyId 和对应的 AccessKeySecret

7.4.3 使用 SLS Indexing Service

  1. 准备数据格式描述文件 如果熟悉 Kafka Indexing Service,那么 SLS Indexing Service 会非常简单。具体请参见 Kafka Indexing Service 的介绍,我们用同样的数据进行索引,那么数据源的数据格式描述文件如下(将其保存为 metrics-sls.json):
{
    "type": "sls",
    "dataSchema": {
        "dataSource": "metrics-sls",
        "parser": {
            "type": "string",
            "parseSpec": {
                "timestampSpec": {
                    "column": "time",
                    "format": "auto"
                },
                "dimensionsSpec": {
                    "dimensions": ["url", "user"]
                },
                "format": "json"
            }
        },
        "granularitySpec": {
            "type": "uniform",
            "segmentGranularity": "hour",
            "queryGranularity": "none"
        },
        "metricsSpec": [{
                "type": "count",
                "name": "views"
            },
            {
                "name": "latencyMs",
                "type": "doubleSum",
                "fieldName": "latencyMs"
            }
        ]
    },
    "ioConfig": {
        "project": <your_project>,
        "logstore": <your_logstore>,
        "consumerProperties": {
            "endpoint": "cn-hangzhou-intranet.log.aliyuncs.com", (以杭州为例,注意使用内网服务入口)
            "access-key-id": <your_access_key_id>,
            "access-key-secret": <your_access_key_secret>,
            "logtail.collection-mode": "simple"/"other"
        },
        "taskCount": 1,
        "replicas": 1,
        "taskDuration": "PT1H"
    },
    "tuningConfig": {
        "type": "sls",
        "maxRowsInMemory": "100000"
    }
}

对比 Kafka Indexing Service 一节中的介绍,我们发现两者基本上是一样的。这里简要列一下需要注意的字段:

  • type: sls。
  • dataSchema.parser.parseSpec.format:与ioConfig.consumerProperties.logtail.collection-mode 有关,也就是与 SLS 日志的收集模式有关。如果是极简模式(simple)收集,那么该处原本文件是什么格式,就填什么格式。如果是非极简模式(other)收集,那么此处取值为 json。
  • ioConfig.project:您要收集的日志的 project。
  • ioConfig.logstore:您要收集的日志的 logstore。
  • ioConfig.consumerProperties.endpoint:SLS 内网服务地址,例如杭州对应 cn-hangzhou-intranet.log.aliyuncs.com
  • ioConfig.consumerProperties.access-key-id:账户的 AccessKeyID。
  • ioConfig.consumerProperties.access-key-secret:账户的 AccessKeySecret。
  • ioConfig.consumerProperties.logtail.collection-mode:SLS 日志收集模式,极简模式填 simple,其他情况填 other。

注意 上述配置文件中的 ioConfig 配置格式仅适用于 EMR-3.20.0 及之前版本。自 EMR-3.21.0 开始,ioConfig 配置变更如下:

"ioConfig": {
        "project": <your_project>,
        "logstore": <your_logstore>,
        "endpoint": "cn-hangzhou-intranet.log.aliyuncs.com", (以杭州为例,注意使用内网服务入口)
        "accessKeyId": <your_access_key_id>,
        "accessKeySec": <your_access_key_secret>,
        "collectMode": "simple"/"other"
        "taskCount": 1,
        "replicas": 1,
        "taskDuration": "PT1H"
    },

即,取消了 consumerProperties 层级、access-key-id、access-key-secret,logtail.collection-mode 变更为 accessKeyIdaccessKeySeccollectMode 。

  1. 执行下述命令添加 SLS supervisor。
curl --negotiate -u:druid -b ~/cookies -c ~/cookies -XPOST -H 'Content-Type: application/json' -d @metrics-sls.json http://emr-header-1.cluster-1234:18090/druid/indexer/v1/supervisor

注意 其中 --negotiate、-u、-b、-c 等选项是针对安全 Druid 集群。

  1. 向 SLS 中导入数据。

可以采用多种方式向 SLS 中导入数据。

  1. 在 Druid 端进行相关查询。

八、E-MapReduce Druid 集群集成 Superset(数据探查与可视化平台 )

8.1 前言

E-MapReduce Druid 集群集成了 Superset 工具。Superset 对 E-MapReduce Druid 做了深度集成,同时也支持多种关系型数据库。

由于 E-MapReduce Druid 也支持SQL,所以可以通过 Superset 以两种方式访问E-MapReduce Druid,即 Apache Druid 原生查询语言或者SQL。

8.2 什么是 Apache Superset?

Superset 是 Airbnb (知名在线房屋短租公司)开源的数据探查与可视化平台(曾用名 Panoramix、Caravel ),该工具在可视化、易用性和交互性上非常有特色,用户可以轻松对数据进行可视化分析。Superset 也是一款企业级商业智能 Web 应用程序。

Superset 已捐赠给 Apache 软件基金会,目前处于孵化阶段。

核心功能:

  • 快速创建数据可视化互动仪表盘

  • 丰富的可视化图表模板,灵活可扩展

  • 细粒度高可扩展性的安全访问模型,支持主要的认证供应商(数据库、OpenID、LDAP、OAuth 等)

  • 简洁的语义层,可以控制数据资源在 UI 的展现方式

  • 与 Druid 深度集成,可以快速解析大规模数据集

8.3 前提条件

Superset默认安装在emr-header-1节点,目前还不支持HA。

在使用该工具前,确保主机能够正常访问 emr-header-1,具体步骤请参见使用SSH连接主节点

8.4 使用Superset

  1. 登录Superset。

在浏览器地址栏中输入http://emr-header-1:18088,按回车,打开Superset登录界面,默认用户名和密码均为admin,请您登录后及时修改密码。


  1. 添加E-MapReduce Druid集群。

登录后默认为英文界面,可单击右上角的国旗图标选择合适的语言。接下来在上方菜单栏中依次选择数据源 > Druid 集群来添加一个E-MapReduce Druid集群。


配置好协调机(Coordinator)和代理机(Broker)的地址,注意E-MapReduce中默认端口均为相应的开源端口前加数字1,例如开源Broker 端口为8082,E-MapReduce中为18082。


  1. 刷新或者添加新数据源。

添加好E-MapReduce Druid集群之后,您可以单击数据源 > 扫描新的数据源,这时E-MapReduce Druid集群上的数据源(datasource)就可以自动被加载进来。

您也可以在界面上单击数据源 > Druid 数据源自定义新的数据源(其操作等同于写一个data source ingestion的json文件),步骤如下。

自定义数据源时需要填写必要的信息,然后保存。

保存之后单击左侧set,编辑该数据源,填写相应的维度列与指标列等信息。

4. 查询E-MapReduce Druid。

数据源添加成功后,单击数据源名称,进入查询页面进行查询。


  1. (可选)将E-MapReduce Druid作为E-MapReduce Druid数据库使用。

Superset提供了SQLAlchemy以多种方言支持各种各样的数据库,其支持的数据库类型如下表所示。、


Superset亦支持该方式访问E-MapReduce Druid,E-MapReduce Druid对应的 SQLAlchemy URI为druid://emr-header-1:18082/druid/v2/sql,如下图所示,将E-MapReduce Druid作为一个数据库添加。


接下来就可以在SQL工具箱里用SQL进行查询了。

以上便是本次分享的全部内容,欢迎小伙伴们评论区留言讨论!

巨人的肩膀

Bilibili:大数据小讲堂 Apache Druid

宇宙湾:Apache Druid 一款高效的 OLAP 引擎

尚硅谷:Apache Druid 教程

阿里云 :大数据 Apache Druid 架构原理文档


浅谈数仓模型(维度建模)


实时数仓不需要保障时效?


字节跳动,5面,终于拿下!


关注不迷路~ 各种干货、资源定期分享!


关于我们:本公众号致力于建设大数据领域技术资源共享平台,3w+关注,保持日更,每天08:16发文,为您提供优秀高质量的大数据领域的分享。欢迎推荐给同行朋友,加群或投稿或转载可加v:iom1128,备注:数据,谢谢!

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存